package com.chenjl.trace.transport.extflume;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * flume读取kafka数据，写入到channel中
 * 2018-10-23 20:08:26
 * @author chenjinlong
 */
public class KafkaSource extends AbstractSource implements Configurable,PollableSource {
	private final Logger log = LoggerFactory.getLogger(this.getClass());
	
	private String bootstrapServers = null;
	private String topicName = null;
	private String groupId = null;
	private Consumer<String,byte[]> consumer = null;
	
	
	@Override
	public void configure(Context context) {
		this.bootstrapServers = context.getString("bootstrap.servers");
		this.topicName = context.getString("topic.name");
		this.groupId = context.getString("group.id");
		
		log.info("KafkaSource读取配置-->>---bootstrapServers : {} , topicName :{}, groupId : {}",bootstrapServers, topicName,groupId);
		
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
		props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class);
		
		consumer = new KafkaConsumer<String,byte[]>(props);
		consumer.subscribe(Arrays.asList(topicName));
	}
	
	@Override
	public Status process() throws EventDeliveryException {
		try {
			ConsumerRecords<String,byte[]> consumerRecords = consumer.poll(Duration.ofSeconds(5));
			log.info("KafkaSource process -->>--- ... 拉取到消息数目 : {}",consumerRecords.count());
			
			List<Event> events = new ArrayList<Event>();
			
			for(ConsumerRecord<String,byte[]> consumerRecord : consumerRecords) {
				String messageId = consumerRecord.key();
				byte[] bytes = consumerRecord.value();
				
				Map<String, String> headers = new HashMap<String, String>();
				headers.put("timestamp",String.valueOf(System.currentTimeMillis()));
				headers.put("messageId",String.valueOf(messageId));
				
				Event event = new SimpleEvent();
				event.setBody(bytes);
				event.setHeaders(headers);
				events.add(event);
			}
			
			if(events.size() > 0) {
				//从source传递到channel
				super.getChannelProcessor().processEventBatch(events);
			}
			
			return Status.READY;
		}
		catch(Exception e) {
			return Status.BACKOFF;
		}
	}
	@Override
	public synchronized void stop() {
		consumer.commitSync();
		consumer.close();
		
		log.info("KafkaSource关闭-->>---");
		super.stop();
	}
	@Override
	public long getBackOffSleepIncrement() {
		return 0;
	}
	@Override
	public long getMaxBackOffSleepInterval() {
		return 0;
	}	
}